
/**
 * FileName: ColumnsUtil
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2019/1/8 17:50
 * Description:
 */

package cn.com.bonc.util;



import cn.com.bonc.conf.ConfigurationManager;
import cn.com.bonc.constant.Constants;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import javax.inject.Singleton;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.split;

public class ColumnsUtil {

    /***properties属性*/
    private Properties properties;

    /***列的数量*/
    private int size;

    private ColumnsUtil(){
        //properties=loadInternalProperties();//加载jar包内的配置文件
        try {
            properties = ExternalResourceUtil.loadColumnMappingPropertiesFileData();
        } catch (IOException e) {
            e.printStackTrace();
            System.out.println("===============================>column-mapping.properties配置文件加载出错");
        }
        size= properties.size();
    }

    private static class SingletonInstance{
        private static final ColumnsUtil instance = new ColumnsUtil();
    }

    public static ColumnsUtil getInstance(){
        return SingletonInstance.instance;
    }

    /**
     * 获取properties文件中配置数据，进而转换为对应列名的数组
     * @param colName 原始单列名
     * @return
     */
    public Column[] getColumns(String colName){

        List<Integer> propKeyList=properties
                .stringPropertyNames()
                .stream()
                .map(Integer::valueOf)
                .sorted(Integer::compareTo)
                .collect(Collectors.toList());

        Column[] columns = new Column[size];
        int index=0;

        for (Integer key:propKeyList){
            String property = properties.getProperty(key.toString());
            columns[index++]=col(colName).getItem(key).as(property);
        }
        return columns;
    }

    /**
     * 在原有列数组的基础上拼接其他列
     * @param arrayCols 列数组
     * @param mutiCols 不定数量的列（0个或多个）
     * @return
     */
    public Column[] combineColumns(Column[] arrayCols,Column ...mutiCols){
        if(mutiCols.length!=0){
            int arrayLen=arrayCols.length;
            arrayCols=Arrays.copyOf(arrayCols,arrayLen+mutiCols.length);
            System.arraycopy(mutiCols,0,arrayCols,arrayLen,mutiCols.length);
        }
        return arrayCols;
    }
    /**
     *将Dataset中的某个列，默认按照‘|’分割成为多列的Dataset
     * @param singleColumnDataset 单列源数据
     * @param colName 需要分割的列 一般为：value
     * @return
     */
    public Dataset<Row> getMultiColumnDataset(Dataset<Row> singleColumnDataset, String colName) {
        Dataset<Row> rowDataset = singleColumnDataset
                .select(col(colName).cast("string"))
                .withColumn("tmp", split(col(colName),getRegex()))
                .select(getColumns("tmp"))
                .drop(col("tmp"));
        return rowDataset;
    }



    /**
     * 加载Jar内部配置文件
     * @return
     * @throws IOException
     */
    private Properties loadInternalProperties() throws IOException {
        final String RESOURCE_PATH = "column-mapping.properties";
        InputStream inputStream = ColumnsUtil.class.getClassLoader().getResourceAsStream(RESOURCE_PATH);
        Properties properties = new Properties();
        properties.load(inputStream);
        return properties;
    }

    /**
     * 判断分割符是否是特殊字符
     * @param str
     * @return
     */
    private static boolean isSpecialChar(String str) {
        String regEx = "[ _`~!@#$%^&*()+=|{}':;',\\[\\].<>/?~！@#￥%……&*（）——+|{}【】‘；：”“’。，、？]";
        return Pattern.compile(regEx).matcher(str).find();
    }

    /**
     * 获取分割符，当分割符为特殊字符时进行转换并进行转换
     * @return
     */
    public static String getRegex() {
        String regex= ConfigurationManager.getProperty(Constants.DEFAULT_DATA_SEPARATOR);
        if (isSpecialChar(regex)){
            StringBuilder stringBuilder = new StringBuilder();
            for (char c:regex.toCharArray()){
                stringBuilder.append("[").append(c).append("]");
            }
            regex=stringBuilder.toString();
        }
        return regex;
    }

}